feat: support file-level parquet row selections#22940
Conversation
There was a problem hiding this comment.
@haohuaijin
Thanks for the PR. I do not see any blocking issues, just a few suggestions that could make the implementation a bit simpler and help protect the new extension behavior.
kosiew
left a comment
There was a problem hiding this comment.
Thanks for the follow-up.
| selection: RowSelection, | ||
| row_group_meta_data: &[RowGroupMetaData], | ||
| ) -> Result<Self> { | ||
| let selectors: Vec<RowSelector> = selection.into(); |
There was a problem hiding this comment.
The performance concern around RowSelection::split_off makes sense. That said, this manual current / leading / mixed cursor logic is a bit harder to audit, especially since the full selector vector is still materialized up front.
If we keep this version, it would be helpful to add a short comment or benchmark note explaining the measured reason for the manual approach. Otherwise, the earlier split_off shape feels easier to reason about because it directly captures the boundary-splitting invariant.
There was a problem hiding this comment.
@kosiew thank you for the suggestion, i restructed the code, can you check again.
i do a benchmark for split_off and this pr, i add this bench.rs to datafusion-examples/examples/bench.rs
cargo build --release -p datafusion-examples --example bench
/usr/bin/time -l target/release/examples/bench split_off heavy 100
/usr/bin/time -l target/release/examples/bench new heavy 100Scenario: heavy's result, run 100 times
| Implementation | Total elapsed | Avg/iteration | Max RSS | Peak footprint |
|---|---|---|---|---|
split_off |
~2055 ms | ~20.55 ms | ~185 MB | ~181 MB |
new |
~592 ms | ~5.92 ms | ~137 MB | ~133 MB |
benchmark code
use datafusion::datasource::physical_plan::parquet::{ParquetAccessPlan, RowGroupAccess};
use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
use parquet::file::metadata::{ColumnChunkMetaData, RowGroupMetaData};
use parquet::schema::types::{SchemaDescPtr, SchemaDescriptor};
use std::time::Instant;
const ROWS_PER_RG: usize = 130_000;
const NUM_ROW_GROUPS: usize = 39;
const TOTAL_ROWS: usize = ROWS_PER_RG * NUM_ROW_GROUPS;
type BenchFn = fn(RowSelection, &[RowGroupMetaData]) -> ParquetAccessPlan;
struct OverallRowSelectionCursor {
selector_iter: std::vec::IntoIter<RowSelector>,
current: Option<RowSelector>,
}
impl OverallRowSelectionCursor {
fn new(selection: RowSelection) -> Self {
let selectors: Vec<RowSelector> = selection.into();
let mut selector_iter = selectors.into_iter();
let current = selector_iter.next();
Self {
selector_iter,
current,
}
}
#[inline]
fn take(&mut self, max_rows: usize) -> Option<RowSelector> {
let sel = self.current?;
let row_count = sel.row_count.min(max_rows);
self.current = if row_count < sel.row_count {
Some(RowSelector {
row_count: sel.row_count - row_count,
skip: sel.skip,
})
} else {
self.selector_iter.next()
};
Some(RowSelector {
row_count,
skip: sel.skip,
})
}
fn remaining_rows(self) -> usize {
self.current.map_or(0, |s| s.row_count)
+ self.selector_iter.map(|s| s.row_count).sum::<usize>()
}
}
struct RowGroupAccessBuilder {
selectors: Vec<RowSelector>,
selected: usize,
skipped: usize,
remaining: usize,
}
impl RowGroupAccessBuilder {
fn new(row_group_rows: usize) -> Self {
Self {
selectors: Vec::with_capacity(1),
selected: 0,
skipped: 0,
remaining: row_group_rows,
}
}
#[inline]
fn push(&mut self, selector: RowSelector) {
self.remaining -= selector.row_count;
if selector.skip {
self.skipped += selector.row_count;
} else {
self.selected += selector.row_count;
}
self.selectors.push(selector);
}
fn into_access(self) -> RowGroupAccess {
if self.selected == 0 {
RowGroupAccess::Skip
} else if self.skipped == 0 {
RowGroupAccess::Scan
} else {
RowGroupAccess::Selection(self.selectors.into())
}
}
}
fn schema_descr() -> SchemaDescPtr {
use parquet::basic::Type as PhysicalType;
use parquet::schema::types::Type as SchemaType;
let field = SchemaType::primitive_type_builder("a", PhysicalType::INT32)
.build()
.unwrap();
let schema = SchemaType::group_type_builder("schema")
.with_fields(vec![std::sync::Arc::new(field)])
.build()
.unwrap();
std::sync::Arc::new(SchemaDescriptor::new(std::sync::Arc::new(schema)))
}
fn row_group_metadata() -> Vec<RowGroupMetaData> {
let descr = schema_descr();
(0..NUM_ROW_GROUPS)
.map(|_| {
let column = ColumnChunkMetaData::builder(descr.column(0))
.set_num_values(ROWS_PER_RG as i64)
.build()
.unwrap();
RowGroupMetaData::builder(descr.clone())
.set_num_rows(ROWS_PER_RG as i64)
.set_column_metadata(vec![column])
.build()
.unwrap()
})
.collect()
}
fn scattered_selection(num_pairs: usize) -> RowSelection {
let mut selectors: Vec<RowSelector> = Vec::with_capacity(num_pairs * 2);
let chunk = TOTAL_ROWS / num_pairs;
let mut acc = 0usize;
for i in 0..num_pairs {
let this = if i == num_pairs - 1 {
TOTAL_ROWS - acc
} else {
chunk
};
let sel = this.min(1);
let skp = this - sel;
if skp > 0 {
selectors.push(RowSelector::skip(skp));
}
if sel > 0 {
selectors.push(RowSelector::select(sel));
}
acc += this;
}
RowSelection::from(selectors)
}
fn new_try_new(
selection: RowSelection,
row_group_meta_data: &[RowGroupMetaData],
) -> ParquetAccessPlan {
let mut cursor = OverallRowSelectionCursor::new(selection);
let mut selection_rows = 0usize;
let mut file_rows = 0usize;
let mut row_groups = Vec::with_capacity(row_group_meta_data.len());
for rg_meta in row_group_meta_data {
let rg_rows = rg_meta.num_rows() as usize;
file_rows += rg_rows;
let mut builder = RowGroupAccessBuilder::new(rg_rows);
while builder.remaining > 0 {
let Some(selector) = cursor.take(builder.remaining) else {
break;
};
selection_rows += selector.row_count;
builder.push(selector);
}
row_groups.push(builder.into_access());
}
selection_rows += cursor.remaining_rows();
assert_eq!(selection_rows, file_rows, "NEW: row count mismatch");
ParquetAccessPlan::new(row_groups)
}
fn split_off_try_new(
selection: RowSelection,
row_group_meta_data: &[RowGroupMetaData],
) -> ParquetAccessPlan {
let mut remaining_selection = selection;
let mut selection_rows = 0usize;
let mut file_rows = 0usize;
let mut row_groups = Vec::with_capacity(row_group_meta_data.len());
for rg_meta in row_group_meta_data {
let rg_rows = rg_meta.num_rows() as usize;
file_rows += rg_rows;
let group_selection = remaining_selection.split_off(rg_rows);
let selected = group_selection.row_count();
let skipped = group_selection.skipped_row_count();
selection_rows += selected + skipped;
let access = if selected == 0 {
RowGroupAccess::Skip
} else if skipped == 0 {
RowGroupAccess::Scan
} else {
RowGroupAccess::Selection(group_selection)
};
row_groups.push(access);
}
selection_rows +=
remaining_selection.row_count() + remaining_selection.skipped_row_count();
assert_eq!(selection_rows, file_rows, "SPLIT_OFF: row count mismatch");
ParquetAccessPlan::new(row_groups)
}
fn run_one(
name: &str,
bench_fn: BenchFn,
label: &str,
num_pairs: usize,
iterations: usize,
) {
let meta = row_group_metadata();
let selection = scattered_selection(num_pairs);
let s = selection.iter().count();
let started = Instant::now();
for _ in 0..iterations {
let plan = bench_fn(selection.clone(), &meta);
std::hint::black_box(plan);
}
println!(
"{name} {label} selectors={s} iterations={iterations} elapsed={:?}",
started.elapsed()
);
}
fn implementation(value: &str) -> Option<(&'static str, BenchFn)> {
match value {
"new" => Some(("new", new_try_new)),
"split_off" => Some(("split_off", split_off_try_new)),
_ => None,
}
}
fn scenario(value: &str) -> Option<(&'static str, usize)> {
match value {
"coarse" => Some(("coarse", 40)),
"medium" => Some(("medium", 50_000)),
"heavy" => Some(("heavy", 1_000_000)),
_ => None,
}
}
fn usage(program: &str) -> ! {
eprintln!(
"Usage: {program} <new|split_off> <coarse|medium|heavy> [iterations]\n\
Example: {program} new heavy 1"
);
std::process::exit(2);
}
fn main() {
let args = std::env::args().collect::<Vec<_>>();
let program = args.first().map(String::as_str).unwrap_or("bench");
if !(3..=4).contains(&args.len()) {
usage(&program);
}
let Some((name, bench_fn)) = implementation(&args[1]) else {
usage(&program);
};
let Some((label, num_pairs)) = scenario(&args[2]) else {
usage(&program);
};
let iterations = match args.get(3) {
Some(value) => value.parse().unwrap_or_else(|_| usage(&program)),
None => 1,
};
if iterations == 0 {
usage(&program);
}
run_one(name, bench_fn, label, num_pairs, iterations);
}
Which issue does this PR close?
RowSelection#22939Rationale for this change
RowSelection#22939What changes are included in this PR?
ParquetRowSelection.ParquetAccessPlan::try_new_from_overall_row_selection.ParquetAccessPlanorParquetRowSelection.ParquetSource.Are these changes tested?
Yes. This PR adds tests for:
ParquetRowSelectionParquetAccessPlanandParquetRowSelectionon the same fileAre there any user-facing changes?
Yes. This adds a new public
ParquetRowSelectiontype for callers that want to attach a file-level ParquetRowSelectionto aPartitionedFile.